﻿using Newtonsoft.Json;
using Orleans.Streams;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{



    public static class GlobalStateManagementWithStringKey<T,TState>
        where TState : class
    {
        static ConcurrentDictionary<string, object> _stateCache;
        static ConcurrentDictionary<string, StateSubscribeStreamHandler> _subscribe;
        static object _lock = new object();

        static GlobalStateManagementWithStringKey() 
        {
            _stateCache = new ConcurrentDictionary<string, object>();
            _subscribe = new ConcurrentDictionary<string, StateSubscribeStreamHandler>();
        }

        public async static Task OnActivateAsync(string key, object state)
        {
            string message = JsonConvert.SerializeObject(state);
            Console.WriteLine("OnActivateAsync:{0},{1}", key, message);
            state = JsonConvert.DeserializeObject<TState>(message);

            if (_stateCache.ContainsKey(key))
            {
                _stateCache[key] = state;
                return;
            }
            else
            {
                _stateCache.TryAdd(key, state);
            }

            // 订阅
            if (!_subscribe.ContainsKey(key))
            {
                lock (_lock)
                {
                    if (!_subscribe.ContainsKey(key))
                    {
                        var subscribe = new StateSubscribeStreamHandler(key);
                        _subscribe.TryAdd(key, subscribe);
                    }
                    else
                    {
                        return;
                    }
                }

                _subscribe[key].SetOnMessage(message =>
                {
                    try
                    {
                        Console.WriteLine("SetOnMessage:" + message);
                        switch (message)
                        {
                            case "deactivate":
                                break;

                            default:
                                var state = JsonConvert.DeserializeObject<TState>(message);
                                _stateCache[key] = state;
                                break;
                        }

                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("sync state error:" + ex.Message);
                    }
                });

                await _subscribe[key].StartAsync();
                Console.WriteLine("_subscribe _" + key);
            }

        }

        public async static Task OnDeactivateAsync(string key)
        {
            if (_stateCache.ContainsKey(key))
            {
                _stateCache.TryRemove(key,out _);
            }

            try
            {
                if (_subscribe.ContainsKey(key))
                {
                    var subscribe = _subscribe[key];
                    _subscribe.TryRemove(key, out _);
                    await subscribe.UnsubscribeAsync();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("UnsubscribeAsync:" + ex.Message);
            }
        }

        public static object ReadState(string key) 
        {
            if (_stateCache.ContainsKey(key))
            {
                return _stateCache[key];
            }
            else
            {
                return null;
            }
        }
    }
}
